package com.isyscore.os.flinksql;


import com.isyscore.os.flinksql.checkpoint.CheckPointParams;
import com.isyscore.os.flinksql.checkpoint.FsCheckPoint;
import com.isyscore.os.flinksql.enums.JobTypeEnum;
import com.isyscore.os.flinksql.model.JobRunParam;
import com.isyscore.os.flinksql.model.SqlCommandCall;
import com.isyscore.os.flinksql.sql.SqlExecutor;
import com.isyscore.os.flinksql.sql.SqlFileParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;

/**
 * @author zhuhuipei
 * @Description:
 * @date 2020-06-23
 * @time 00:33
 */
public class FlinkSqlJobApplication {

    private static final Logger log = LoggerFactory.getLogger(FlinkSqlJobApplication.class);

    public static void main(String[] args) {

        try {
            Arrays.stream(args).forEach(arg -> log.info("{}", arg));
            JobRunParam jobRunParam = buildParam(args);
            String originSql = new String(Base64.getDecoder().decode(jobRunParam.getSql()), StandardCharsets.UTF_8);
            String[] sqls = originSql.split("\\r?\\n");
            List<String> sql = Arrays.asList(sqls);
            List<SqlCommandCall> sqlCommandCallList = SqlFileParser.fileToSql(sql);
            EnvironmentSettings settings = null;
            TableEnvironment tEnv = null;
            if (jobRunParam.getJobTypeEnum() != null && JobTypeEnum.SQL_BATCH.equals(jobRunParam.getJobTypeEnum())) {
                log.info("[SQL_BATCH]本次任务是批任务");
                //批处理
                settings = EnvironmentSettings.newInstance()
                        .useBlinkPlanner()
                        .inBatchMode()
                        .build();
                tEnv = TableEnvironment.create(settings);
            } else {
                log.info("[SQL_STREAMING]本次任务是流任务");
                //默认是流 流处理 目的是兼容之前版本
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                log.info("用户自定义classpaths:" + jobRunParam.getDepClassPaths());
                if (jobRunParam.getDepClassPaths() != null && !jobRunParam.getDepClassPaths().isEmpty()) {
                    addPipelineClassPaths(jobRunParam.getDepClassPaths(), env);
                }
                settings = EnvironmentSettings.newInstance()
                        .useBlinkPlanner()
                        .inStreamingMode()
                        .build();
                tEnv = StreamTableEnvironment.create(env, settings);
                //设置checkPoint
                FsCheckPoint.setCheckpoint(env, jobRunParam.getCheckPointParam());
            }

            StatementSet statementSet = tEnv.createStatementSet();

            SqlExecutor.exeSql(sqlCommandCallList, tEnv, statementSet);

            TableResult tableResult = statementSet.execute();

            if (tableResult == null || tableResult.getJobClient().get().getJobID() == null) {
                throw new RuntimeException("任务运行失败 没有获取到JobID");
            }

        } catch (Exception e) {
            System.err.println("任务执行失败:" + e.getMessage());
            log.error("任务执行失败：", e);
        }
    }


    private static JobRunParam buildParam(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String sql = parameterTool.get("sql");
        Preconditions.checkNotNull(sql, "-sql参数 不能为空");
        JobRunParam jobRunParam = new JobRunParam();
        jobRunParam.setSql(sql);
        String classpaths = parameterTool.get("C");
        if (!StringUtils.isBlank(classpaths)) {
            jobRunParam.setDepClassPaths(Arrays.asList(classpaths.split(",")));
        }
        jobRunParam.setCheckPointParam(CheckPointParams.buildCheckPointParam(parameterTool));
        String type = parameterTool.get("type");
        if (StringUtils.isNotEmpty(type)) {
            jobRunParam.setJobTypeEnum(JobTypeEnum.getJobTypeEnum(Integer.valueOf(type)));
        }
        return jobRunParam;
    }

    private static void addPipelineClassPaths(List<String> classPaths, StreamExecutionEnvironment env) throws NoSuchFieldException, IllegalAccessException, MalformedURLException {
        for (String classPath : classPaths) {
            loadJarFromUrl(new URL(classPath));
        }
        Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
        configurationField.setAccessible(true);
        Configuration conf = (Configuration) configurationField.get(env);
        ConfigOption<List<String>> classpathsOp = ConfigOptions.key("pipeline.classpaths").stringType().asList().defaultValues("321");
        List<String> pipelineClasspaths = null;
        if (conf.getOptional(classpathsOp).isPresent()) {
            pipelineClasspaths = conf.getOptional(classpathsOp).get();
        } else {
            pipelineClasspaths = new ArrayList<>();
        }
        pipelineClasspaths.addAll(classPaths);
        conf.set(classpathsOp, pipelineClasspaths);
    }

    //动态加载Jar
    private static void loadJarFromUrl(URL jarUrl) {
        //从URLClassLoader类加载器中获取类的addURL方法
        Method method = null;
        try {
            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
        } catch (NoSuchMethodException | SecurityException e1) {
            e1.printStackTrace();
        }
        // 获取方法的访问权限
        assert method != null;
        boolean accessible = method.isAccessible();
        try {
            //修改访问权限为可写
            if (!accessible) {
                method.setAccessible(true);
            }
            // 获取FlinkSqlJobApplication自身加载器
            URLClassLoader classLoader = (URLClassLoader) FlinkSqlJobApplication.class.getClassLoader();
            //jar路径加入到系统url路径里
            method.invoke(classLoader, jarUrl);
            log.info("load classpath:"+jarUrl);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            method.setAccessible(accessible);
        }
    }
}
